#include "remm.h"

lsv_rwlock_t conn_lock;
lsv_rwlock_t list_rwlock;


lsv_s32_t remm_init(lsv_volume_proto_t *volume_proto){
	lsv_s32_t err = 0;
	nodeinfo_t info;
	nid_t remote_nid;
	char remote_cluster_name[256];
	char remote_host_name[256];
        char buf[MAX_BUF_LEN];

	DINFO("init ...\n");
	err = env_init_simple("remote_copy");
	if(err){
		DERROR("call env_init_simple failed, err: %d\n", err);
		return err;
	}

	err = stor_init(NULL, (uint64_t)-1);
	if(err){
		DERROR("call stor_init failed, err: %d\n", err);
		return err;
	}
	
	err = remm_conf_load(volume_proto);
	if(err){
		DERROR("exec remm_conf_load failed, err: %d\n", err);
		return err;
	}

	err = node_getinfo(&info, ((lsv_remm_info *)volume_proto->remm_info)->lsv_remm_remote_node_name, buf);
	if(err){
		DERROR("call node_getinfo failed, err: %d\n", err);	
	}

	memset(remote_cluster_name, 0, 256);
	memset(remote_host_name, 0, 256);
	remote_nid = info.stat->nid;
	sprintf("%s", info.clustername, strlen(info.clustername));
	sprintf("%s", info.nodename, strlen(info.nodename));



	switch(((lsv_remm_info *)volume_proto->remm_info)->lsv_remm_id){
		case LSV_REMM_SLAVE:
			err = remm_listen(volume_proto);
			if(err){
				DERROR("exec remm_listen failed, err: %d\n", err);
				return err;
			}
			remm_thread_init(volume_proto);
			break;

		case LSV_REMM_MASTER:
			INIT_LIST_HEAD(&((lsv_remm_info *)volume_proto->remm_info)->remm_list);
			lsv_rwlock_init(&conn_lock);
			lsv_rwlock_init(&list_rwlock);
			if(LSV_REMM_MODE_ASYNC == ((lsv_remm_info *)volume_proto->remm_info)->lsv_remm_id){
				remm_thread_init(volume_proto);
				remm_sync_thread_init(volume_proto);
			}
			if(LSV_SYS_LOAD == volume_proto->u.system_power_on){
			
			}
			break;

		default:
			DERROR("not start remote_mirror, lsv_remm_id is LSV_REMM_NONE!\n");
	}


	DINFO("exit ....\n");

	return err;
}

lsv_s32_t remm_destroy(lsv_volume_proto_t *volume_proto){
	remm_thread_destroy(volume_proto);
	if(LSV_REMM_SLAVE == ((lsv_remm_info *)volume_proto->remm_info)->lsv_remm_id){
		close(((lsv_remm_info *)volume_proto->remm_info)->listen_fd);
		remm_thread_destroy(volume_proto);
	}else{
		remm_sync_thread_destroy(volume_proto);
		lsv_rwlock_destroy(&conn_lock);
		lsv_rwlock_destroy(&list_rwlock);
	}
	free(volume_proto->remm_info);
	volume_proto->remm_info = NULL;
	return 0;
}

lsv_s32_t remm_receive_process_loop(lsv_volume_proto_t *volume_proto){
	lsv_s32_t err = 0;
	lsv_s32_t socket_fd = 0;
	lsv_op_type op_type;
	lsv_u64_t lba;
	lsv_u32_t size;
	lsv_s32_t sleep_times = 0;
	lsv_s32_t buf_off = 0;
	lsv_s32_t len = 0;
	lsv_s32_t op_code;
	lsv_s8_t  *data_buf = NULL;
	buffer_t  commit_buf;

	socket_fd = remm_accept(volume_proto);
	if(socket_fd < 0){
		DERROR("remm_accept return sock_fd:%d\n", socket_fd);
		err = socket_fd;
		goto EXIT;
	}

	memset((lsv_s8_t *)&op_type, 0, sizeof(lsv_op_type));

	err = read(socket_fd, (lsv_s8_t *)&op_type, sizeof(lsv_op_type));
	if(err != sizeof(lsv_op_type)){
		DERROR("read size : %d not equal the msg size: %lu\n", err, sizeof(lsv_op_type));
		err = -EINVAL;
		goto EXIT;
	}
#ifdef LSV
	DINFO("RECEIVE INFO: op_code: %d, size: %d, path: %s\n", op_type.op_code, op_type.size, op_type.path);	
#else
	DINFO("RECEIVE INFO: op_code: %d, size: %d, chkid.id: %lu, chkid.type: %u, chkid.idx: %u\n", op_type.op_code, op_type.size, op_type.chkid.id, op_type.chkid.type, op_type.chkid.idx);	
#endif
	op_code = op_type.op_code;
	size = op_type.size;
	lba = op_type.offset;
	DINFO("OP_CODE: %d, type: %d, code:%d\n", op_code, op_code & LSV_OP_TYPE_MASK, op_code & LSV_OP_CODE_MASK);
	if(LSV_OP_TYPE_MSG ==(op_code & LSV_OP_TYPE_MASK)){
		/*msg, or command*/
		switch(op_code & LSV_OP_CODE_MASK){
			case LSV_OP_CODE_SETATTR:
				break;
			case LSV_OP_CODE_TRUNCATE:
				//lsv_truncate(op_type.path, size);	
				break;
			default:
				DERROR("\n");
		};
	}else if(LSV_OP_TYPE_DATA == (op_code & LSV_OP_TYPE_MASK)){
		/*data*/
		DINFO("LSV_OP_TYPE_DATA......\n");
		switch(op_code & LSV_OP_CODE_MASK){
			case LSV_OP_CODE_READ:
				break;
			case LSV_OP_CODE_WRITE:
				DINFO("lsv_op_code_write operation\n");	
				data_buf = (lsv_s8_t *)malloc(op_type.size);
				memset(data_buf, 0, op_type.size);
				buf_off = 0;
				len = op_type.size;
				sleep_times = 0;
				while(len){
					err = read(socket_fd, data_buf + buf_off, len);
					if(err <= 0){
						if(errno == EAGAIN || errno == EINTR){
							if((++sleep_times) > 5){
								break;
							}
							continue;
						}
					}
					len -= err;
					buf_off += err;
				}

				if(buf_off != op_type.size){
					DERROR("read data error, size: %d, err: %d, errno:%d\n", op_type.size, err, errno);
					close(socket_fd);
					goto EXIT;
				}

				mbuffer_init(&commit_buf, 0);
				err = mbuffer_copy(&commit_buf, (const char *)data_buf, size);	
				if(err){
					err = -ENOMEM;
					goto EXIT;
				}
				lsv_wbuffer_append(volume_proto, lba, size, &commit_buf);
				//lsv_wbuffer_insert(volume_proto, lba, size, data_buf);
				write(socket_fd, "done", sizeof("done"));
				mbuffer_free(&commit_buf);
				break;
			default:
				DERROR("\n");
		};
	}
	close(socket_fd);

EXIT:
	return 0;
}

lsv_s32_t remm_data_send(lsv_volume_proto_t *volume_proto, lsv_s8_t * buf, lsv_u64_t lba, lsv_s32_t size){
	lsv_remm_info * remm_info = (lsv_remm_info *)volume_proto->remm_info;
	if(LSV_REMM_MODE_SYNC == remm_info->lsv_remm_mode){
		return remm_data_send_sync(volume_proto, buf, lba, size);
	}else if(LSV_REMM_MODE_ASYNC == remm_info->lsv_remm_mode){
		return remm_data_send_async(volume_proto, buf, lba, size);
	}
	return -EINVAL;
}

lsv_s32_t remm_truncate_send(lsv_volume_proto_t *volume_proto, lsv_s32_t size){
	lsv_remm_info * remm_info = (lsv_remm_info *)volume_proto->remm_info;
	if(LSV_REMM_MODE_SYNC == remm_info->lsv_remm_mode){
		return remm_truncate_send_sync(volume_proto, size);
	}else if(LSV_REMM_MODE_ASYNC == remm_info->lsv_remm_mode){
		return remm_truncate_send_async(volume_proto, size);
	}
	return -EINVAL;
}
